DartVM ReceivePort
介绍
Isolate 间通过 ReceivePort 和 SendPort 实现通信。这种 Port 机制,在 DartVM 底层有着更加深入的应用,这种机制也与 Dart 的消息队列机制关联非常密切。
类关系
ReceivePort 相关联的类有点多,并且跨 Dart 和 C/C++,看着有点乱,梳理如下图:
ReceivePort Dart 抽象类
该类位于 sdk/lib/isolate/isolate.dart,是一个抽象类。
类注释
与 SendPort 一起使用,是 isolate 间通信的唯一方式。
ReceivePort 拥有一个 sendPort getter 方法,用于获取 SendPort。通过 SendPort,能够向其关联的 ReceivePort 发送消息。ReceivePort 实现了 Stream 接口,通过对 ReceivePort 进行 listen,能够进行数据监听。
从 Stream 角度,ReceivePort 是一个非广播 stream,只能有一个 listener 监听消息。不过,通过包装 Stream.asBroadcastStream,可以实现广播。
工厂方法1
开启一个长期活跃的端口,用于接收消息。
external factory ReceivePort([String debugName = '']);
其中:
- debugName:参与用于调试工具展示
- 如果取消订阅,端口也会关闭
工厂方法2
基于 RawReceivePort 创建 ReceivePort。
external factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort);
工厂方法实现
两个工厂方法都是只有 external 声明,具体实现位于 sdk/lib/_internal/lib/isolate_patch.dart:
@patch
class ReceivePort {
@patch
factory ReceivePort([String debugName = '']) =>
new _ReceivePortImpl(debugName);
@patch
factory ReceivePort.fromRawReceivePort(RawReceivePort rawPort) {
return new _ReceivePortImpl.fromRawReceivePort(rawPort);
}
}
可以看到,通过工厂方法,指向了具体的实现类 _ReceivePortImpl。
listen 监听
ReceivePort 实现了 stream 接口,因此提供 listen 方法用于进行监听。
StreamSubscription<dynamic> listen(
viod onData(var message)?,
{
Function? onError,
void onDone()?
bool? cancelOnError
});
其中:
- onError 和 cancelOnError 是不起作用的,因为 ReceivePort 永远不会收到 error
- onDone 回调会在 close 调用后调用
close 关闭
关闭 receive port。
关闭之后,就收不到后续消息了。
void close();
sendPort getter
获取 sendPort:
SendPort get sendPort;
拿到 sendPort,才能向 receive port 发送消息。
_ReceivePortImpl
该类位于 sdk/lib/_internal/vm/lib/isolate_patch.dart,具体实现如下:
class _ReceivePortImpl extends Stream implements ReceivePort {
_ReceivePortImpl([String debugName = ''])
: this.fromRawReceivePort(new RawReceivePort(null, debugName));
_ReceivePortImpl.fromRawReceivePort(this._rawPort)
: _controller = new StreamController(sync: true) {
_controller.onCancel = close;
_rawPort.handler = _controller.add;
}
SendPort get sendPort {
return _rawPort.sendPort;
}
StreamSubscription listen(void onData(var message)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
close() {
_rawPort.close();
_controller.close();
}
final RawReceivePort _rawPort;
final StreamController _controller;
}
两个构造方法
可以看到都是指向了 fromRawReceivePort。
对于默认构造函数,创建了一个新的 RawReceivePort 实例,并存入 _rawPort 成员。
构造函数中还有一点,是创建了 Stream 的控制器。并且把 streamController 的 add 方法引用传给 _rawPort 的 handler。
这样 RawReceivePort 只要收到信息,就会自动抛到 Stream 上面去。
从中还能看出,在 Dart 侧创建一个 ReceivePort,其实内部是需要走到底层的。
RawReceivePort
这部分内容单独放到 DartVM RawReceivePort 进行总结。